83 Python并发编程之多协程

571次阅读
没有评论

共计 4589 个字符,预计需要花费 12 分钟才能阅读完成。

引入

我们知道一个线程同一时间内只能被操作系统分配一个 CPU 资源, 我们可以基于多进程实现并发, 也可以基于多线程实现并发, CPU正在运行一个任务, 有两种情况下会被切去执行其它任务, 一种是该任务发生了阻塞, 另一是该任务运行时间过长或者被其他优先级更高的任务夺走 CPU, 对于单线程来说, 如果一个单线程下运行了多个任务, 那么就不可避免的出现I/O 操作, 一旦一个任务出现阻塞的情况, 那么整个线程将处于阻塞状态 (因为一旦阻塞, CPU 资源将会被夺走), 但是如果我们能在自己的应用程序中 (即用户级别, 非操作系统级别) 控制单线程下多个任务能在一个任务遇到 I/O 之后立马切换到另一个任务, 那么我们就可以保证该线程能最大限度的保持就绪状态 (也就是随时能进入运行态), 相当于我们在应用程序级别将I/O 操作隐藏起来, 迷惑操作系统, 让其看到的状态就是一直在计算, I/O比较少, 于是操作系统就会将更多的 CPU 资源分配给该线程

一. 什么是协程

1. 回顾并发的本质

  • CPU 在多个任务之间来回切换, 并且在切换之前保存好当前的状态

2. 协程是什么

  • 基于单线程下实现的并发, 又称为微线程(Coroutine), 它是一种用户态的轻量级线程, 即协程是由应用程序自己控制调度的
  • 与进程线程一样, 并不是真实存在的东西, 只是程序员为了方便理解虚拟出来的概念

3. 内核级别和用户级别的调度

  • 线程的执行需要被操作系统分配 CPU 资源, 这属于内核级别的调度
  • 而单个线程内的多个任务之间的调度是由应用程序自己实现的, 这属于用户级别的调度

4. 内核级别与用户级别在线程中的调度比较

  • 优点 :
  • 协程的开销更小, 更轻量, 属于应用程序级别的切换, 操作系统完全感觉不到
  • 单线程下实现并发效果, 更大限度的利用 CPU 资源 (可以是一个程序开启多个进程, 一个进程开启多个线程, 每个线程开启协程)
  • 特点 :
  • 自己的应用程序实现的多个任务之间的调度, 遇到 I / O 就切, 可以将单线程的 I / O 降到最低
  • 缺点 :
  • 协程是在单线程下实现的, 所以它无法利用多核
  • 引入协程, 就需要检测该线程下所有的 I / O 行为, 实现遇到 I / O 就切, 少一个都不行, 一旦协程出现了阻塞, 也将会阻塞整个线程

5. 使用 yieldd 模拟协程的效果

  • yield 是在生成器那一章学到的知识点, 它可以保存状态, 这与操作系统保存线程的状态很像, 但它是属于代码级别控制的, 更轻量
import time

def Foo():
    for i in range(100):
        print(f"--->Foo:{i}")
        yield  # 保存状态

def Bar():
    f = Foo()
    for i in range(10000000):
        i += 1
        next(f)

start_time = time.time()
Bar()
stop_time = time.time()
print(f"user time:{stop_time-start_time}")

ps : yield无法检测 I/O, 无法实现遇到I/O 就进行切换

6. 总结协程特点

  • 在单线程实现的并发
  • 修改共享数据不需要加锁(不会造成同时修改的情况)
  • 用户程序里自己保存多个控制流的上下文栈
  • 一个协程遇到 I/O 操作会自动切换到其他协程

ps : 如何实现自动检测 I/O, 上面模拟使用的 yield 以及 greenlet 都无法做到, 于是以下就开始介绍 gevent 模块 (select 机制)

二.Gevent 模块介绍

1. 安装 Gevent

🍋"cmd" 或 "pycharm" 的 "Terminal"
pip3 install gevent


2. 什么是 gevent 模块

  • Geven t 是 Python 的第三方库, 它为各种并发和网络相关的任务提供了整洁的 API, 我们可以通过gevent 轻松实现并发同步或异步编程

  • gevent 中用到的主要模式是 Greenlet, 它是以C 扩展模块形式接入 Python 的轻量级协程

  • Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度

3. 使用方法

  • 常用方法
方法 作用
gevent.spawn(func,args/kwargs) 创建一个协程对象, 第一个参数是函数名, 后面的参数是函数的位置参数或者关键字参数
[协程对象].join() 等待协程对象的结束
gevent.joinall([对象 1, 对象 2]) 等待多个协程对象的结束, 参数是一个列表, 放多个协程对象
[协程对象].value 拿到协程对象的返回值
  • 示例: 遇到 I/O 自动切换任务
import gevent
import time

def eat(name):
    print(f"{name}正在吃东西 ")
    gevent.sleep(3)
    print(f"{name}吃完了 ")
    return "i am eat"

def play(name):
    print(f"{name}正在玩手机 ")
    gevent.sleep(1)
    print(f"{name}玩够了手机 ")
    return "i am play"

start_time = time.time()
g1 = gevent.spawn(eat," 派大星 ")
g2 = gevent.spawn(play," 海绵宝宝 ")

# g1.join()  # 等待协程对象 g1 结束
# g2.join()  # 等待协程对象 g2 结束
gevent.joinall([g1,g2])  # 等待协程对象 g1 和 g2 结束

print(g1.value)  # 获取协程对象 g1 的返回值
print(g2.value)  # 获取协程对象 g2 的返回值

print(f" 用时:{time.time()-start_time}")

''' 输出
派大星正在吃东西
海绵宝宝正在玩手机
海绵宝宝玩够了手机
派大星吃完了
i am eat
i am play
用时:3.0330262184143066
'''

4.gevent 不支持识别其他操作的 I /O

  • time.sleep(2) 或者其他类型的I/O, gevent 模块无法识别, 只能识别 gevent.sleep(2)

  • 解决方法 : 使用猴子补丁让其能识别 from gevent import monkey;monkey.patch_all(), 放在文件开头

from gevent import monkey;monkey.patch_all()
import gevent
import time
from threading import current_thread

def eat(name):
    print(current_thread().name)  # 查看该线程的名字
    print(f"{name}正在吃东西 ")
    time.sleep(3)
    print(f"{name}吃完了 ")
    return "i am eat"

def drink(name):
    print(current_thread().name)  # 查看该线程的名字
    print(f"{name}正在喝汤 ")
    time.sleep(2)
    print(f"{name}把汤喝完了 ")
    return "i am drink"

def play(name):
    print(current_thread().name)  # 查看该线程的名字
    print(f"{name}正在玩手机 ")
    time.sleep(1)
    print(f"{name}玩够了手机 ")
    return "i am play"

start_time = time.time()
g1 = gevent.spawn(eat," 派大星 ")
g2 = gevent.spawn(drink," 章鱼哥 ")
g3 = gevent.spawn(play," 海绵宝宝 ")

# g1.join()  # 等待协程对象 g1 结束
# g2.join()  # 等待协程对象 g2 结束
# g3.join()  # 等待协程对象 g3 结束
gevent.joinall([g1,g2,g3])  # 等待协程对象 g1 和 g2 结束

print(g1.value)  # 获取协程对象 g1 的返回值
print(g2.value)  # 获取协程对象 g2 的返回值
print(g3.value)  # 获取协程对象 g3 的返回值

print(f" 用时:{time.time()-start_time}")

''' 输出
Dummy-1
派大星正在吃东西
Dummy-2
章鱼哥正在喝汤
Dummy-3
海绵宝宝正在玩手机
海绵宝宝玩够了手机
章鱼哥把汤喝完了
派大星吃完了
i am eat
i am drink
i am play
用时:3.0230190753936768
'''
🍓# 可以查看到三个线程的名字 : Dummy-1、Dummy-2、Dummy-3、(都是假线程)

三. 使用协程编写 socket-TCP 程序示例

1. 服务端

from gevent import monkey;monkey.patch_all()  # 添加猴子补丁
import gevent
from socket import *

# 建链接循环
def link(ip,port):
    try:
        server = socket(AF_INET, SOCK_STREAM)
        server.bind((ip,port))
        server.listen(5)
    except Exception as E:
        print(E);return
    while 1:
            conn,addr = server.accept()
            gevent.spawn(communication,conn)  
            # 建立链接成功之后开启一个协程任务进行通信循环

# 通信循环
def communication(conn):
    while 1:
        try:
            data = conn.recv(1024)
            if len(data) == 0:break
            conn.send(data.upper())
        except Exception as E:
            print(E);break
    conn.close()

if __name__ == '__main__':
    g1 = gevent.spawn(link,"127.0.0.1",8090)  # 先启动一个建立链接循环的协程任务
    g1.join()

2. 客户端

  • 客户端的编写可以是常规编写, 然后复制出多台客户端
from socket import *

client = socket(AF_INET,SOCK_STREAM)
client.connect(("127.0.0.1",8090))

while 1:
    user = input(">>").strip()
    if len(user) == 0:continue
    client.send(user.encode("utf-8"))
    data = client.recv(1024)
    print(data.decode("utf-8"))
  • 也可以使用多线程开启多个客户端, 不过公用同一个终端屏幕, 效果不明显
from threading import Thread,current_thread
from socket import *

def connection(ip,port,i):
    client = socket(AF_INET,SOCK_STREAM)
    client.connect((ip,port))
    while 1:
        client.send(f" 客户端编号:{i}, 名字:{current_thread().name}".encode("utf-8"))
        data = client.recv(1024)
        print(data.decode("utf-8"))

if __name__ == '__main__':
    for i in range(10):  # 多线程开启 10 个客户端进行与服务端的连接
        t = Thread(target=connection,args=("127.0.0.1",8090,i))
        t.start()
正文完
 
shawn
版权声明:本站原创文章,由 shawn 2023-06-16发表,共计4589字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)